Socket异步编程之 Begin/End 模式

作者:陈广
日期:2018-3-26


上篇文章,我们讲解了使用最原始 Thread 进行编程,但 Thread 实际上现在已经很少使用了。用得比较多的是 Begin/End,即使现在查微软的网络编程文档,介绍的还是这种编程模型。它并不是最新的网络编程模型,而且使用和理解起来还比较困难、麻烦。但想到现在大多数项目用的都是这个模型,学习还是很有必要的。另外此处不再是授课内容,将使用最新 C# 语法。

建立连接

之前我们在 Thread 编程中需要自己手动创建线程做一系列操作,Begin/End 编程模型则不再需要你去创建线程,它会在内部使用线程完成一系列操作,最重要的是它使用的是线程池。在 Thread 编程模型中,我们使用Socket.Accept()方法接收连接,然后创建线程去处理这些连接。如果在短时间内有成百上千的连接,对这些连接一一创建线程显然会耗费大量服务器资源,而使用线程池当然是一个很好的选择。在异步模式下,服务器可以使用BeginAccept方法和EndAccept方法来接受客户端连接的任务,在客户端则通过BeginConnect方法和EndConnect方法来实现向服务器的连接请求。

单次连接

先来个最简单的,服务器只接受一个客户端连接。

服务器程序

在 Visual Studio 中新建一个控制台应用程序,使用如下命名空间:

using System;
using System.Net;
using System.Net.Sockets;

使用如下代码:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(5);
        listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
        Console.WriteLine("服务器开始侦听...");
    }
    catch(Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
    Socket listener = (Socket)ar.AsyncState; //将参数ar还原为Socket
    Socket handler = listener.EndAccept(ar); //EndAccept对应BeginAccept,它会阻塞线程,直到收到连接后解除阻塞,并结束一个侦听周期,
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}

listener.BeginAccept()方法用于开始侦听客户端连接,它会去线程池开一个线程用于侦听服务,然后立即返回,继续执行下面的代码打印出服务器开始侦听...,这点在之后运行程序的结果中可验证。当有客户端发送连接请求后,则会自动调用AcceptCallback回调函数。

BeginAccept方法原型为:

public IAsyncResult BeginAccept(AsyncCallback callback, object state);

BeginAcceptEndAccept需要成对使用,本例在主线程中开始侦听时使用BeginAccept;在接收到连接后调用的回调函数中使用EndAccept阻塞工作线程,并在收到一个连接后解除阻塞,从而结束一个侦听周期。

从代码可知,这种编程机制已经非常古老,使用的还是最原始的委托。现在可以使用lambda表达式将回调函数直接写在BeginAccept()方法中:

listener.BeginAccept(new AsyncCallback((ar)=> {
    Socket s = (Socket)ar.AsyncState;
    Socket handler = s.EndAccept(ar);
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}), listener);

甚至可以使用闭包机制,直接调用主线程 Socket,不再需要参数传递:

listener.BeginAccept(new AsyncCallback((ar)=> {
    Socket handler = listener.EndAccept(ar);
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}), null);

以上两种方式测试可以使用,但实际使用时闭包会不会出问题,不得而知。之后的代码还是老老实实按照微软文档的方式,使用委托。

客户端程序

新建一个控制台应用程序,使用如下命名空间:

using System;
using System.Net;
using System.Net.Sockets;

使用如下代码:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint point = new IPEndPoint(ip, 5000);
        s.BeginConnect(point, new AsyncCallback(ConnectCallback), s); //向服务器发起连接
        Console.WriteLine($"开始连接服务器 {ip.ToString()} ...");
    }
    catch(Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//连接成功后的回调函数
static void ConnectCallback(IAsyncResult ar)
{
    try
    {
        Socket s = (Socket)ar.AsyncState;
        s.EndConnect(ar);//连接结束
        Console.WriteLine($"成功连接服务器 {s.RemoteEndPoint.ToString()} ");
    }
    catch(Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

理解了BeginAcceptEndAccept,也就理解了BeginConnectEndConnect。两者使用上没什么分别,都是成对出现,一个开始,一个结束,甚至使用的委托都完全相同。

运行效果如下图所示:

从运行结果可知,开第一个客户端,服务器收到连接,但开第二个客户端,服务器就没有收到连接了。这是因为一个BeginConnect只能接收一个连接,而程序只执行了一次BeginConnect。要想重复接收,只能象之前处理的一样,使用while(true)不断循环执行BeginConnect

多客户端连接

由于BeginConnect并不阻塞程序,直接套while(true)肯定是行不通的,瞬间开几万条线程是分分钟的事。所以只能手动阻塞,微软示例中使用的是ManualResetEvent。这个类我们在多线程相关文章中并没有讲到,但它的使用方法和ManualResetEventSlim一样,请参考《线程同步》这篇文章。

ManualResetEvent的机制是发一个信号可以允许多条等待线程通过:

更改服务器代码如下:

public static ManualResetEvent allDone = new ManualResetEvent(false);//线程信号
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(5);
        Console.WriteLine("服务器开始侦听...");
        while (true)
        {
            allDone.Reset();//关门
            listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
            allDone.WaitOne();//阻塞主线程
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
    allDone.Set();//开门
    Socket listener = (Socket)ar.AsyncState;
    Socket handler = listener.EndAccept(ar);
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}

运行程序并直接使用上例中的客户端程序,多开几个,运行效果如下:

现在可以接收多个客户端的连接了。

画张图演示程序运行过程

虽然 ManualResetEvent 一次允许多条线程通过,但这里只有主线程调用,所以每次只会有主线程一条线程通过。所以执行过程如图所示:BeginAccept执行时会到线程池取线程进行侦听,同时到门口等着门打开,当客户端有连接请求时,在处理连接的同时会把门打开,使得主线程通过大门进入到下一个 Accept 周期,同时大门关闭。

从这段代码可知,虽然BeginAccept另外开了一条线程进行监听,但主线程还是会被allDone.WaitOne()阻塞住的,所以在实际应用中,还得要专门开一个线程来处理侦听连接。这就比之前用用线程处理同步的Accept多一个线程了,使用起来还是相对麻烦的。当然,这里用到线程池以及专门的机制,对I/O密集型操作当然更好,出错的几率更低。

数据的发送和接收

服务器程序

数据的接收与 Accept 和 Connect 机制类似,也是使用BeginReceiveEndReceive配对使用。

首先来看看BeginReceive方法原型:

public IAsyncResult BeginReceive(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);

其中:

这些参数后两个本文之前已经提到过。而前面的四个参数除了 offset 相信大家在上篇文章中全都使用过,只是在这里需要作为参数传递而已。offset 一般情况下设置为0即可。

更改服务器代码如下:

public class StateObject //将BeginReceive需要的的参数包装在此类中进行传递
{
    public Socket workSocket = null;
    public const int BufferSize = 1024;//缓冲区大小
    public byte[] buffer = new byte[BufferSize];//接收缓冲
}
public static ManualResetEvent allDone = new ManualResetEvent(false);//线程信号
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(5);
        Console.WriteLine("服务器开始侦听...");
        while (true)
        {
            allDone.Reset();//关门
            listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
            allDone.WaitOne();//阻塞主线程
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
    allDone.Set();//开门
    Socket listener = (Socket)ar.AsyncState;
    Socket handler = listener.EndAccept(ar);
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");

    StateObject state = new StateObject();
    state.workSocket = handler;
    handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
        new AsyncCallback(ReadCallback), state);
}
//收到 socket 发送的信息后触发的接收回调函数
static void ReadCallback(IAsyncResult ar)
{
    try
    {
        StateObject state = (StateObject)ar.AsyncState;
        Socket handler = state.workSocket;
        int count = handler.EndReceive(ar);//这句会阻塞程序,直接接收到数据为止
        string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
        Console.WriteLine($"收到{handler.RemoteEndPoint.ToString()}发来的信息:{recvStr}");
        //进入到下一个等待接收周期
        handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
            new AsyncCallback(ReadCallback), state);
    }
    catch(Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

本例使用了一个内部类StateObjectBeginReceive所需参数包装起来并方便在回调函数中进行传递。

另外,我们也注意到,跟上一篇文章中的同步接收进行对比,这里没有使用while(true)循环接收数据,而是直接在回调函数内再一次调用BeginReceive而进入到下一个接收周期。然后本线程结束,接力棒交到下一个ReadCallback。有点递归的感觉。流程如下图所示:

客户端程序

数据的发送使用BeginSendEndSend配对使用。

public IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);

参数和之前的BeginReceive完全一样,这里就不再赘述了。

将客户端代码更改如下:

private static ManualResetEvent connectDone = new ManualResetEvent(false);//控制连接
private static ManualResetEvent sendDone = new ManualResetEvent(false);//控制发送
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint point = new IPEndPoint(ip, 5000);
        s.BeginConnect(point, new AsyncCallback(ConnectCallback), s); //向服务器发起连接
        Console.WriteLine($"开始连接服务器 {ip.ToString()} ...");
        connectDone.WaitOne();//等待连接成功

        Console.WriteLine("开始发送信息");
        for (int i = 0; i < 10; i++)
        {
            byte[] sendBuff = Encoding.Unicode.GetBytes($"消息 {i}");
            s.BeginSend(sendBuff, 0, sendBuff.Length, SocketFlags.None,
                new AsyncCallback(SendCallback), s);
            sendDone.WaitOne();//等待发送成功
            Thread.Sleep(1000);//挂起1秒,稍后删掉
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//连接成功后的回调函数
static void ConnectCallback(IAsyncResult ar)
{
    try
    {
        Socket s = (Socket)ar.AsyncState;
        s.EndConnect(ar);
        Console.WriteLine($"成功连接服务器 {s.RemoteEndPoint.ToString()} ");
        connectDone.Set();//开门,通知主线程连接完成
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
}
//发送回调函数
static void SendCallback(IAsyncResult ar)
{
    try
    {
        Socket s = (Socket)ar.AsyncState;
        int count = s.EndSend(ar);
        sendDone.Set();//开门,通知主线程继续发送信息
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

由于BeginSend是异步操作,为避免多条信息同时发送,使用sendDone控制顺序发送,使得一条信息发送完毕后再发送下一条。运行程序,效果如下图所示:

另外,使用connectDone等待连接成功,如果注释掉 Main 方法中的sendDone.WaitOne();,将会看到先打印发送消息,然后再打印连接成功。

粘包问题

将上例代码中的客户端 Main 方法中的Thread.Sleep(1000);这句代码删除,再次运行,结果如下图所示:

对比上例运行结果,我们发现发送的消息全连在一起了。这是因为发送速度太快,旧的数据还未接收,新的数据已经压入缓冲。这种现象叫粘包,前面讲的使用同步接收也会出现同样的问题,只是当时每发送一条消息后,都会停顿一段时间,所以没出现而已。要解决这类问题,只能手动划定边界了。

在客户端发消息时,在每条消息后面加一个’\n’作为结束标记,然后在服务器端解析消息时,以’\n’为界,将取出的字符串分段打印。

客户端发消息的for循环改为:

for (int i = 0; i < 1000; i++)
{   //给每条消息后面加一个'\n'作为结束标记
    byte[] sendBuff = Encoding.Unicode.GetBytes($"消息 {i}"+"\n");
    s.BeginSend(sendBuff, 0, sendBuff.Length, SocketFlags.None,
        new AsyncCallback(SendCallback), s);
    sendDone.WaitOne();
}

服务器端的ReadCallback方法代码改为:

static void ReadCallback(IAsyncResult ar)
{
    try
    {
        StateObject state = (StateObject)ar.AsyncState;
        Socket handler = state.workSocket;
        int count = handler.EndReceive(ar);
        string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
        int i = 0, pos = 0;
        //以'\n'为界,取出每段字符
        while (i < recvStr.Length)
        {
            if (recvStr[i] == '\n')
            {
                string s = recvStr.Substring(pos, i - pos);
                Console.WriteLine(s);
                pos = i + 1;
            }
            i++;
        }
        //进入到下一个等待接收周期
        handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
            new AsyncCallback(ReadCallback), state);
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

运行程序,这里只贴出服务端结果:

结果看似没问题,但浏览全部收到的消息,可以发现,每隔一段就会出现字符丢失的情况。这是因为缓冲区满时,一条消息可能会被分成两段,第一段在这次发送,第二段在下次发送。使用当前算法使得第一段丢失。所以需要改进算法,将第一段保存下来,留到下次接收时接上。继续更改服务器代码:

更改StateObject类代码如下:

public class StateObject
{
    public Socket workSocket = null;
    public const int BufferSize = 1024;//缓冲区大小
    public byte[] buffer = new byte[BufferSize];//接收缓冲
    public string remainStr = ""; //上次接收的消息解析后的剩余部分
}

更改ReadCallback代码如下:

static void ReadCallback(IAsyncResult ar)
{
    try
    {
        StateObject state = (StateObject)ar.AsyncState;
        Socket handler = state.workSocket;
        int count = handler.EndReceive(ar);
        string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
        int i = 0, pos = 0;
        //找到第一个结束标记,将第一段字符串取出跟上次剩余的字符串合并
        while (recvStr[i] != '\n')
        {
            i++;
        }
        Console.WriteLine($"{state.remainStr}{recvStr.Substring(0, i)}");
        pos = ++i;
        //以'\n'为界,取出每段字符
        while (i < recvStr.Length)
        {
            if (recvStr[i] == '\n')
            {
                string s = recvStr.Substring(pos, i - pos);
                Console.WriteLine(s);
                pos = i + 1;
            }
            i++;
        }
        //剩余的无'\n'结尾的字符存入state.remainStr供下次使用
        state.remainStr = recvStr.Substring(pos, i - pos);
        //进入到下一个等待接收周期
        handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
            new AsyncCallback(ReadCallback), state);
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

现在再运行程序,就不会有字符丢失了。

Begin/End 就介绍到这吧,也不做例子了。因为这并不是最新的编程模型,了解即可,以后看别人的源码可能会用到。